#!/bin/sh
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# See usage() function below for more details ...
#
# Note that the script uses an external file to setup RabbitMQ policies
# so make sure to create it from an example shipped with the package.
#
#######################################################################
# Initialization:

: ${OCF_FUNCTIONS_DIR=${OCF_ROOT}/lib/heartbeat}
. ${OCF_FUNCTIONS_DIR}/ocf-shellfuncs

#######################################################################

# Fill in some defaults if no values are specified

PATH=/sbin:/usr/sbin:/bin:/usr/bin

OCF_RESKEY_binary_default="/usr/sbin/rabbitmq-server"
OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl"
OCF_RESKEY_debug_default=false
OCF_RESKEY_username_default="rabbitmq"
OCF_RESKEY_groupname_default="rabbitmq"
OCF_RESKEY_admin_user_default="guest"
OCF_RESKEY_admin_password_default="guest"
OCF_RESKEY_definitions_dump_file_default="/etc/rabbitmq/definitions"
OCF_RESKEY_pid_file_default="/var/run/rabbitmq/pid"
OCF_RESKEY_log_dir_default="/var/log/rabbitmq"
OCF_RESKEY_mnesia_base_default="/var/lib/rabbitmq/mnesia"
OCF_RESKEY_host_ip_default="127.0.0.1"
OCF_RESKEY_node_port_default=5672
OCF_RESKEY_erlang_cookie_default=false
OCF_RESKEY_erlang_cookie_file_default="/var/lib/rabbitmq/.erlang.cookie"
OCF_RESKEY_use_fqdn_default=false
OCF_RESKEY_fqdn_prefix_default=""
OCF_RESKEY_max_rabbitmqctl_timeouts_default=3
OCF_RESKEY_policy_file_default="/usr/local/sbin/set_rabbitmq_policy"

: ${HA_LOGTAG="lrmd"}
: ${HA_LOGFACILITY="daemon"}
: ${OCF_RESKEY_binary=${OCF_RESKEY_binary_default}}
: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}}
: ${OCF_RESKEY_debug=${OCF_RESKEY_debug_default}}
: ${OCF_RESKEY_username=${OCF_RESKEY_username_default}}
: ${OCF_RESKEY_groupname=${OCF_RESKEY_groupname_default}}
: ${OCF_RESKEY_admin_user=${OCF_RESKEY_admin_user_default}}
: ${OCF_RESKEY_admin_password=${OCF_RESKEY_admin_password_default}}
: ${OCF_RESKEY_definitions_dump_file=${OCF_RESKEY_definitions_dump_file_default}}
: ${OCF_RESKEY_log_dir=${OCF_RESKEY_log_dir_default}}
: ${OCF_RESKEY_mnesia_base=${OCF_RESKEY_mnesia_base_default}}
: ${OCF_RESKEY_pid_file=${OCF_RESKEY_pid_file_default}}
: ${OCF_RESKEY_node_port=${OCF_RESKEY_node_port_default}}
: ${OCF_RESKEY_erlang_cookie=${OCF_RESKEY_erlang_cookie_default}}
: ${OCF_RESKEY_erlang_cookie_file=${OCF_RESKEY_erlang_cookie_file_default}}
: ${OCF_RESKEY_use_fqdn=${OCF_RESKEY_use_fqdn_default}}
: ${OCF_RESKEY_fqdn_prefix=${OCF_RESKEY_fqdn_prefix_default}}
: ${OCF_RESKEY_max_rabbitmqctl_timeouts=${OCF_RESKEY_max_rabbitmqctl_timeouts_default}}
: ${OCF_RESKEY_policy_file=${OCF_RESKEY_policy_file_default}}

#######################################################################

OCF_RESKEY_start_time_default=$((OCF_RESKEY_CRM_meta_timeout / 6000 + 2))
: ${OCF_RESKEY_start_time=${OCF_RESKEY_start_time_default}}
OCF_RESKEY_stop_time_default=${OCF_RESKEY_start_time_default}
: ${OCF_RESKEY_stop_time=${OCF_RESKEY_start_time_default}}
OCF_RESKEY_command_timeout_default=""
: ${OCF_RESKEY_command_timeout=${OCF_RESKEY_command_timeout_default}}
TIMEOUT_ARG=$((OCF_RESKEY_CRM_meta_timeout / 6000 + 30))
COMMAND_TIMEOUT="/usr/bin/timeout ${OCF_RESKEY_command_timeout} ${TIMEOUT_ARG}"

#######################################################################

usage() {
    cat <<UEND
        usage: $0 (start|stop|validate-all|meta-data|status|monitor)

        $0 manages an ${OCF_RESKEY_binary} process as an HA resource

        The 'start' operation starts the networking service.
        The 'stop' operation stops the networking service.
        The 'validate-all' operation reports whether the parameters are valid
        The 'meta-data' operation reports this RA's meta-data information
        The 'status' operation reports whether the networking service is running
        The 'monitor' operation reports whether the networking service seems to be working

UEND
}

meta_data() {
    # The EXTENDED_OCF_PARAMS parameter below does not exist by default
    # and hence converted to an empty string unless overridden. It
    # could be used by an extention script to add new parameters. For
    # example see https://review.openstack.org/#/c/249180/10

    cat <<END
<?xml version="1.0"?>
<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
<resource-agent name="${OCF_RESKEY_binary}">
<version>1.0</version>

<longdesc lang="en">
Resource agent for ${OCF_RESKEY_binary}
</longdesc>
<shortdesc lang="en">Resource agent for ${OCF_RESKEY_binary}</shortdesc>
<parameters>

<parameter name="binary" unique="0" required="0">
<longdesc lang="en">
RabbitMQ binary
</longdesc>
<shortdesc lang="en">RabbitMQ binary</shortdesc>
<content type="string" default="${OCF_RESKEY_binary_default}" />
</parameter>

<parameter name="ctl" unique="0" required="0">
<longdesc lang="en">
rabbitctl binary
</longdesc>
<shortdesc lang="en">rabbitctl binary binary</shortdesc>
<content type="string" default="${OCF_RESKEY_ctl_default}" />
</parameter>

<parameter name="pid_file" unique="0" required="0">
<longdesc lang="en">
RabbitMQ PID file
</longdesc>
<shortdesc lang="en">RabbitMQ PID file</shortdesc>
<content type="string" default="${OCF_RESKEY_pid_file_default}" />
</parameter>

<parameter name="log_dir" unique="0" required="0">
<longdesc lang="en">
RabbitMQ log directory
</longdesc>
<shortdesc lang="en">RabbitMQ log directory</shortdesc>
<content type="string" default="${OCF_RESKEY_log_dir_default}" />
</parameter>

<parameter name="username" unique="0" required="0">
<longdesc lang="en">
RabbitMQ user name
</longdesc>
<shortdesc lang="en">RabbitMQ user name</shortdesc>
<content type="string" default="${OCF_RESKEY_username_default}" />
</parameter>

<parameter name="groupname" unique="0" required="0">
<longdesc lang="en">
RabbitMQ group name
</longdesc>
<shortdesc lang="en">RabbitMQ group name</shortdesc>
<content type="string" default="${OCF_RESKEY_groupname_default}" />
</parameter>

<parameter name="admin_user" unique="0" required="0">
<longdesc lang="en">
RabbitMQ default admin user for API
</longdesc>
<shortdesc lang="en">RabbitMQ admin user</shortdesc>
<content type="string" default="${OCF_RESKEY_admin_user_default}" />
</parameter>

<parameter name="admin_password" unique="0" required="0">
<longdesc lang="en">
RabbitMQ default admin user password for API
</longdesc>
<shortdesc lang="en">RabbitMQ admin password</shortdesc>
<content type="string" default="${OCF_RESKEY_admin_password_default}" />
</parameter>

<parameter name="definitions_dump_file" unique="0" required="0">
<longdesc lang="en">
RabbitMQ default definitions dump file
</longdesc>
<shortdesc lang="en">RabbitMQ definitions dump file</shortdesc>
<content type="string" default="${OCF_RESKEY_definitions_dump_file}" />
</parameter>

<parameter name="command_timeout" unique="0" required="0">
<longdesc lang="en">
Timeout command arguments for issued commands termination (value is auto evaluated)
</longdesc>
<shortdesc lang="en">Arguments for timeout wrapping command</shortdesc>
<content type="string" default="${OCF_RESKEY_command_timeout_default}" />
</parameter>

<parameter name="start_time" unique="0" required="0">
<longdesc lang="en">
Timeout for start rabbitmq server
</longdesc>
<shortdesc lang="en">Timeout for start rabbitmq server</shortdesc>
<content type="string" default="${OCF_RESKEY_start_time_default}" />
</parameter>

<parameter name="stop_time" unique="0" required="0">
<longdesc lang="en">
Timeout for stopping rabbitmq server
</longdesc>
<shortdesc lang="en">Timeout for stopping rabbitmq server</shortdesc>
<content type="string" default="${OCF_RESKEY_stop_time_default}" />
</parameter>

<parameter name="debug" unique="0" required="0">
<longdesc lang="en">
The debug flag for agent (${OCF_RESKEY_binary}) instance.
In the /tmp/ directory will be created rmq-* files for log
some operations and ENV values inside OCF-script.
</longdesc>
<shortdesc lang="en">AMQP server (${OCF_RESKEY_binary}) debug flag</shortdesc>
<content type="boolean" default="${OCF_RESKEY_debug_default}" />
</parameter>

<parameter name="mnesia_base" unique="0" required="0">
<longdesc lang="en">
Base directory for storing Mnesia files
</longdesc>
<shortdesc lang="en">Base directory for storing Mnesia files</shortdesc>
<content type="boolean" default="${OCF_RESKEY_mnesia_base_default}" />
</parameter>

<parameter name="host_ip" unique="0" required="0">
<longdesc lang="en">
${OCF_RESKEY_binary} should listen on this IP address
</longdesc>
<shortdesc lang="en">${OCF_RESKEY_binary} should listen on this IP address</shortdesc>
<content type="boolean" default="${OCF_RESKEY_host_ip_default}" />
</parameter>

<parameter name="node_port" unique="0" required="0">
<longdesc lang="en">
${OCF_RESKEY_binary} should listen on this port
</longdesc>
<shortdesc lang="en">${OCF_RESKEY_binary} should listen on this port</shortdesc>
<content type="boolean" default="${OCF_RESKEY_node_port_default}" />
</parameter>

<parameter name="erlang_cookie" unique="0" required="0">
<longdesc lang="en">
Erlang cookie for clustering. If specified, will be updated at the mnesia reset
</longdesc>
<shortdesc lang="en">Erlang cookie</shortdesc>
<content type="boolean" default="${OCF_RESKEY_erlang_cookie_default}" />
</parameter>

<parameter name="erlang_cookie_file" unique="0" required="0">
<longdesc lang="en">
Erlang cookie file path where the cookie will be put, if requested
</longdesc>
<shortdesc lang="en">Erlang cookie file</shortdesc>
<content type="boolean" default="${OCF_RESKEY_erlang_cookie_file_default}" />
</parameter>

<parameter name="use_fqdn" unique="0" required="0">
<longdesc lang="en">
Either to use FQDN or a shortname for the rabbitmq node
</longdesc>
<shortdesc lang="en">Use FQDN</shortdesc>
<content type="boolean" default="${OCF_RESKEY_use_fqdn_default}" />
</parameter>

<parameter name="fqdn_prefix" unique="0" required="0">
<longdesc lang="en">
Optional FQDN prefix for RabbitMQ nodes in cluster.
FQDN prefix can be specified to host multiple RabbitMQ instances on a node or
in case of RabbitMQ running in dedicated network/interface.
</longdesc>
<shortdesc lang="en">FQDN prefix</shortdesc>
<content type="string" default="${OCF_RESKEY_fqdn_prefix_default}" />
</parameter>

<parameter name="max_rabbitmqctl_timeouts" unique="0" required="0">
<longdesc lang="en">
If during monitor call rabbitmqctl times out, the timeout is ignored
unless it is Nth timeout in a row. Here N is the value of the current parameter.
If too many timeouts happen in a raw, the monitor call will return with error.
</longdesc>
<shortdesc lang="en">Fail only if that many rabbitmqctl timeouts in a row occurred</shortdesc>
<content type="string" default="${OCF_RESKEY_max_rabbitmqctl_timeouts_default}" />
</parameter>

<parameter name="policy_file" unique="0" required="0">
<longdesc lang="en">
A path to the shell script to setup RabbitMQ policies
</longdesc>
<shortdesc lang="en">A policy file path</shortdesc>
<content type="string" default="${OCF_RESKEY_policy_file_default}" />
</parameter>

$EXTENDED_OCF_PARAMS

</parameters>

<actions>
<action name="start" timeout="20" />
<action name="stop" timeout="20" />
<action name="status" timeout="20" />
<action name="monitor" depth="0" timeout="30" interval="5" />
<action name="monitor" depth="0" timeout="30" interval="3" role="Master"/>
<action name="monitor" depth="30" timeout="60" interval="103" />
<action name="promote" timeout="30" />
<action name="demote"  timeout="30" />
<action name="notify"   timeout="20" />
<action name="validate-all" timeout="5" />
<action name="meta-data" timeout="5" />
</actions>
</resource-agent>
END
}


MIN_MASTER_SCORE=100
BEST_MASTER_SCORE=1000


#######################################################################
# Functions invoked by resource manager actions

#TODO(bogdando) move proc_kill, proc_stop to shared OCF functions
#  to be shipped with HA cluster packages
###########################################################
# Attempts to kill a process with retries and checks procfs
# to make sure the process is stopped.
#
# Globals:
#   LL
# Arguments:
#   $1 - pid of the process to try and kill
#   $2 - service name used for logging and match-based kill, if the pid is "none"
#   $3 - signal to use, defaults to SIGTERM
#   $4 - number of retries, defaults to 5
#   $5 - time to sleep between retries, defaults to 2
# Returns:
#   0 - if successful
#   1 - if process is still running according to procfs
#   2 - if invalid parameters passed in
###########################################################
proc_kill()
{
    local pid="${1}"
    local service_name="${2}"
    local signal="${3:-SIGTERM}"
    local count="${4:-5}"
    local process_sleep="${5:-2}"
    local LH="${LL} proc_kill():"
    local pgrp="$(ps -o pgid= ${pid} 2>/dev/null | tr -d '[[:space:]]')"

    if [ "${pid}" -a "${pgrp}" = "1" ] ; then
        ocf_log err "${LH} shall not kill by the bad pid 1 (init)!"
        return 2
    fi

    if [ "${pid}" = "none" ]; then
        local matched
        matched="$(pgrep -fla ${service_name})"
        if [ -z "${matched}" ] ; then
            ocf_log err "${LH} cannot find any processes matching the ${service_name}!"
            return 2
        fi
        ocf_log debug "${LH} no pid provided, will try the ${service_name}, matched list: ${matched}"
        while [ $count -gt 0 ]; do
            if [ -z "${matched}" ]; then
                break
            else
                matched="$(pgrep -fla ${service_name})"
                ocf_log debug "${LH} Stopping ${service_name} with ${signal}..."
                ocf_run pkill -f -"${signal}" "${service_name}"
            fi
            sleep $process_sleep
            count=$(( count-1 ))
        done
        pgrep -f "${service_name}" > /dev/null
        if [ $? -ne 0 ] ; then
            ocf_log debug "${LH} Stopped ${service_name} with ${signal}"
            return 0
        else
            ocf_log warn "${LH} Failed to stop ${service_name} with ${signal}"
            return 1
        fi
    else
    # pid is not none
        while [ $count -gt 0 ]; do
            if [ ! -d "/proc/${pid}" ]; then
                break
            else
                ocf_log debug "${LH} Stopping ${service_name} with ${signal}..."
                ocf_run pkill -"${signal}" -g "${pgrp}"
            fi
            sleep $process_sleep
            count=$(( count-1 ))
        done

        # Check if the process ended after the last sleep
        if [ ! -d "/proc/${pid}" ] ; then
            ocf_log debug "${LH} Stopped ${service_name} with ${signal}"
            return 0
        fi

        ocf_log warn "${LH} Failed to stop ${service_name} with ${signal}"
        return 1
    fi
}

###########################################################
# Attempts to kill a process with the given pid or pid file
# using proc_kill and will retry with sigkill if sigterm is
# unsuccessful.
#
# Globals:
#   OCF_ERR_GENERIC
#   OCF_SUCCESS
#   LL
# Arguments:
#   $1 - pidfile or pid or 'none', if stopping by the name matching
#   $2 - service name used for logging or for the failback stopping method
#   $3 - stop process timeout (in sec), used to determine how many times we try
#        SIGTERM and an upper limit on how long this function should try and
#        stop the process. Defaults to 15.
# Returns:
#   OCF_SUCCESS - if successful
#   OCF_ERR_GENERIC - if process is still running according to procfs
###########################################################
proc_stop()
{
    local pid_param="${1}"
    local service_name="${2}"
    local timeout="${3:-15}"
    local LH="${LL} proc_stop():"
    local i
    local pid
    local pidfile
    if [ "${pid_param}" = "none" ] ; then
        pid="none"
    else
        # check if provide just a number
        echo "${pid_param}" | egrep -q '^[0-9]+$'
        if [ $? -eq 0 ]; then
            pid="${pid_param}"
        elif [ -e "${pid_param}" ]; then # check if passed in a pid file
            pidfile="${pid_param}"
            pid=$(cat "${pidfile}" 2>/dev/null | tr -s " " "\n" | sort -u)
        else
            ocf_log warn "${LH} pid param ${pid_param} is not a file or a number, try match by ${service_name}"
            pid="none"
        fi
    fi
    # number of times to try a SIGTEM is (timeout - 5 seconds) / 2 seconds
    local stop_count=$(( ($timeout-5)/2 ))

    # make sure we stop at least once
    if [ $stop_count -le 0 ]; then
        stop_count=1
    fi

    if [ -z "${pid}" ] ; then
        ocf_log warn "${LH} unable to get PID from ${pidfile}, try match by ${service_name}"
        pid="none"
    fi

    if [ -n "${pid}" ]; then
        for i in ${pid} ; do
            [ "${i}" ] || break
            ocf_log info "${LH} Stopping ${service_name} by PID ${i}"
            proc_kill "${i}" "${service_name}" SIGTERM $stop_count
            if [ $? -ne 0 ]; then
                # SIGTERM failed, send a single SIGKILL
                proc_kill "${i}" "${service_name}" SIGKILL 1 2
                if [ $? -ne 0 ]; then
                    ocf_log err "${LH} ERROR: could not stop ${service_name}"
                    return "${OCF_ERR_GENERIC}"
                fi
            fi
        done
    fi

    # Remove the pid file here which will remove empty pid files as well
    if [ -n "${pidfile}" ]; then
        rm -f "${pidfile}"
    fi

    ocf_log info "${LH} Stopped ${service_name}"
    return "${OCF_SUCCESS}"
}

# Invokes the given command as a rabbitmq user and wrapped in the
# timeout command.
su_rabbit_cmd() {
    local timeout
    if [ "$1" = "-t" ]; then
      timeout="/usr/bin/timeout ${OCF_RESKEY_command_timeout} $2"
      shift 2
    else
      timeout=$COMMAND_TIMEOUT
    fi
    local cmd="${1:-status}"
    local LH="${LL} su_rabbit_cmd():"
    local rc=1
    local user=$OCF_RESKEY_username
    local mail=/var/spool/mail/rabbitmq
    local pwd=/var/lib/rabbitmq
    local home=/var/lib/rabbitmq

    ocf_log debug "${LH} invoking a command: ${cmd}"
    su $user -s /bin/sh -c "USER=${user} MAIL=${mail} PWD=${pwd} HOME=${home} LOGNAME=${user} \
      ${timeout} ${cmd}"
    rc=$?
    ocf_log info "${LH} the invoked command exited ${rc}: ${cmd}"
    return $rc
}

now() {
    date -u +%s
}

master_score() {
    local LH="${LL} master_score():"
    local score=$1
    if [ -z $score ] ; then
        score=0
    fi
    ocf_log info "${LH} Updating master score attribute with ${score}"
    ocf_run crm_master -l reboot -v $score || return $OCF_ERR_GENERIC
    return $OCF_SUCCESS
}

# Return either FQDN or shortname, depends on the OCF_RESKEY_use_fqdn.
get_hostname() {
    if [ "${OCF_RESKEY_use_fqdn}" = 'false' ] ; then
        echo "$(hostname -s)"
    else
        echo "$(hostname -f)"
    fi
}

# Strip the FQDN to the shortname, if OCF_RESKEY_use_fqdn was set;
# Prepend prefix to the hostname
process_fqdn() {
    if [ "${OCF_RESKEY_use_fqdn}" = 'false' ] ; then
        echo "${OCF_RESKEY_fqdn_prefix}$1" | awk -F. '{print $1}'
    else
        echo "${OCF_RESKEY_fqdn_prefix}$1"
    fi
}

# Return OCF_SUCCESS, if current host is in the list of given hosts.
# Otherwise, return 10
my_host() {
    local hostlist="$1"
    local hostname
    local hn
    local rc=10
    local LH="${LL} my_host():"

    hostname=$(process_fqdn $(get_hostname))
    ocf_log info "${LH} hostlist is: $hostlist"
    for host in $hostlist ; do
        hn=$(process_fqdn "${host}")
        ocf_log debug "${LH} comparing '$hostname' with '$hn'"
        if [ "${hostname}" = "${hn}" ] ; then
            rc=$OCF_SUCCESS
            break
        fi
    done

    return $rc
}

get_integer_node_attr() {
    local value
    value=$(crm_attribute -N $1 -l reboot --name "$2" --query 2>/dev/null | awk '{ split($3, vals, "="); if (vals[2] != "(null)") print vals[2] }')
    if [ $? -ne 0 -o -z "$value" ] ; then
        value=0
    fi
    echo $value
}

get_node_start_time() {
    get_integer_node_attr $1 'rabbit-start-time'
}

get_node_master_score() {
    get_integer_node_attr $1 'master-p_rabbitmq-server'
}

# Return either rabbit node name as FQDN or shortname, depends on the OCF_RESKEY_use_fqdn.
rabbit_node_name() {
    echo "rabbit@$(process_fqdn $1)"
}

rmq_setup_env() {
    local H
    local dir
    H="$(get_hostname)"
    export RABBITMQ_NODENAME=$(rabbit_node_name $H)
    export RABBITMQ_NODE_PORT=$OCF_RESKEY_node_port
    export RABBITMQ_PID_FILE=$OCF_RESKEY_pid_file
    MNESIA_FILES="${OCF_RESKEY_mnesia_base}/$(rabbit_node_name $H)"
    RMQ_START_TIME="${MNESIA_FILES}/ocf_server_start_time.txt"
    MASTER_FLAG_FILE="${MNESIA_FILES}/ocf_master_for_${OCF_RESOURCE_INSTANCE}"
    THIS_PCMK_NODE=`crm_node -n`
    TOTALVMEM=`free -mt | awk '/Total:/ {print $2}'`
    # check and make PID file dir
    local PID_DIR=$( dirname $OCF_RESKEY_pid_file )
    if [ ! -d ${PID_DIR} ] ; then
        mkdir -p ${PID_DIR}
        chown -R ${OCF_RESKEY_username}:${OCF_RESKEY_groupname} ${PID_DIR}
        chmod 755 ${PID_DIR}
    fi

    # Regardless of whether we just created the directory or it
    # already existed, check whether it is writable by the configured
    # user
    for dir in ${PID_DIR} "${OCF_RESKEY_mnesia_base}" "${OCF_RESKEY_log_dir}"; do
        if test -e ${dir}; then
            local files
            files=$(su -s /bin/sh - $OCF_RESKEY_username -c "find ${dir} ! -writable")
            if [ "${files}" ]; then
                ocf_log warn "Directory ${dir} is not writable by ${OCF_RESKEY_username}, chowning."
                chown -R ${OCF_RESKEY_username}:${OCF_RESKEY_groupname} "${dir}"
            fi
        fi
    done

    export LL="${OCF_RESOURCE_INSTANCE}[$$]:"
    update_cookie
}

# Return a RabbitMQ node to its virgin state.
# For reset and force_reset to succeed the RabbitMQ application must have been stopped.
# If the app cannot be stopped, beam will be killed and mnesia files will be removed.
reset_mnesia() {
    local LH="${LL} reset_mnesia():"
    local make_amnesia=false
    local rc=$OCF_ERR_GENERIC

    # check status of a beam process
    get_status
    rc=$?
    if [ $rc -eq 0 ] ; then
        # beam is running
        # check status of rabbit app and stop it, if it is running
        get_status rabbit
        rc=$?
        if [ $rc -eq 0 ] ; then
            # rabbit app is running, have to stop it
            ocf_log info "${LH} Stopping RMQ-app prior to reset the mnesia."
            stop_rmq_server_app
            rc=$?
            if [ $rc -ne 0 ] ; then
                 ocf_log warn "${LH} RMQ-app can't be stopped."
                 make_amnesia=true
            fi
        fi

        if ! $make_amnesia ; then
            # rabbit app is not running, reset mnesia
            ocf_log info "${LH} Execute reset with timeout: ${TIMEOUT_ARG}"
            su_rabbit_cmd "${OCF_RESKEY_ctl} reset"
            rc=$?
            if [ $rc -ne 0 ] ; then
                ocf_log info "${LH} Execute force_reset with timeout: ${TIMEOUT_ARG}"
                su_rabbit_cmd "${OCF_RESKEY_ctl} force_reset"
                rc=$?
                if [ $rc -ne 0 ] ; then
                    ocf_log warn "${LH} Mnesia couldn't cleaned, even by force-reset command."
                    make_amnesia=true
                fi
            fi
        fi
    else
        # there is no beam running
        make_amnesia=true
        ocf_log warn "${LH} There is no Beam process running."
    fi

    # remove mnesia files, if required
    if $make_amnesia ; then
        kill_rmq_and_remove_pid
        ocf_run rm -rf "${MNESIA_FILES}"
        ocf_log warn "${LH} Mnesia files appear corrupted and have been removed from ${MNESIA_FILES}."
    fi
    # always return OCF SUCCESS
    return $OCF_SUCCESS
}


block_client_access()
{
    # do not add temporary RMQ blocking rule, if it is already exist
    # otherwise, try to add a blocking rule with max of 5 retries
    local tries=5
    until $(iptables -nvL --wait | grep -q 'temporary RMQ block') || [ $tries -eq 0 ]; do
      tries=$((tries-1))
      iptables --wait -I INPUT -p tcp -m tcp --dport ${OCF_RESKEY_node_port} -m state --state NEW,RELATED,ESTABLISHED \
      -m comment --comment 'temporary RMQ block' -j REJECT --reject-with tcp-reset
      sleep 1
    done
    if [ $tries -eq 0 ]; then
        return $OCF_ERR_GENERIC
    else
        return $OCF_SUCCESS
    fi
}

unblock_client_access()
{
    # remove all temporary RMQ blocking rules, if there are more than one exist
    for i in $(iptables -nvL --wait --line-numbers | awk '/temporary RMQ block/ {print $1}'); do
      iptables --wait -D INPUT -p tcp -m tcp --dport ${OCF_RESKEY_node_port} -m state --state NEW,RELATED,ESTABLISHED \
      -m comment --comment 'temporary RMQ block' -j REJECT --reject-with tcp-reset
    done
}

get_nodes__base(){
    local infotype=''
    local rc=$OCF_ERR_GENERIC
    local c_status

    if [ "$1" = 'nodes' ]
    then
        infotype='db_nodes'
    elif [ "$1" = 'running' ]
    then
        infotype='running_db_nodes'
    fi
    c_status=`${OCF_RESKEY_ctl} eval "mnesia:system_info(${infotype})." 2>/dev/null`
    rc=$?
    if [ $rc -ne 0 ] ; then
        echo ''
        return $OCF_ERR_GENERIC
    fi
    # translate line like '{running_nodes,['rabbit@node-1','rabbit@node-2','rabbit@node-3']},' to node_list
    echo $(echo "${c_status}" | awk -F, '{ for (i=1;i<=NF;i++) { if ($i ~ /@/) { gsub(/[\[\]}{]/,"",$i); print $i; } }}' | tr -d  "\'")
    return $OCF_SUCCESS
}

get_nodes() {
    echo $(get_nodes__base nodes)
    return $?
}

get_running_nodes() {
    echo $(get_nodes__base running)
    return $?
}

# Get all known cluster nodes including offline ones
get_all_pacemaker_nodes()
{
    echo `crm_node -l | awk '{print $2}' | grep -v "^$" | sed -e '/(null)/d'`
}

# Get alive cluster nodes in visible partition, but the specified one
get_alive_pacemaker_nodes_but()
{
    if [ -z "$1" ]; then
        echo `crm_node -l -p | sed -e '/(null)/d'`
    else
        echo `crm_node -l -p | sed -e "s/${1}//g" | sed -e '/(null)/d'`
    fi
}

# Get current master. If a parameter is provided,
# do not check node with that name
get_master_name_but()
{
    local node
    for node in $(get_alive_pacemaker_nodes_but "$@")
    do
        ocf_log info "${LH} looking if $node is master"

        if is_master $node; then
            ocf_log info "${LH} master is $node"
            echo $node
            break
        fi
    done
}

# Returns 0 if we are clustered with provideded node
is_clustered_with()
{
    get_running_nodes | grep -q $(rabbit_node_name $1);
    return $?
}


check_need_join_to() {
    local join_to
    local node
    local running_nodes
    local rc=$OCF_ERR_GENERIC

    rc=0
    join_to=$(rabbit_node_name $1)
    running_nodes=$(get_running_nodes)
    for node in $running_nodes ; do
        if [ "${join_to}" = "${node}" ] ; then
            rc=1
            break
        fi
    done

    return $rc
}

# Update erlang cookie, if it has been specified
update_cookie() {
    local cookie_file_content
    if [ "${OCF_RESKEY_erlang_cookie}" != 'false' ] ; then
        if [ -f "${OCF_RESKEY_erlang_cookie_file}" ]; then
            # First line of cookie file without newline
            cookie_file_content=$(head -n1 "${OCF_RESKEY_erlang_cookie_file}" | perl -pe chomp)
        fi
        # As there is a brief period of time when the file is empty
        # (shell redirection has already opened and truncated file,
        # and echo hasn't finished its job), we are doing this write
        # only when cookie has changed.
        if [ "${OCF_RESKEY_erlang_cookie}" != "${cookie_file_content}" ]; then
            echo "${OCF_RESKEY_erlang_cookie}" > "${OCF_RESKEY_erlang_cookie_file}"
        fi
        # And this are idempotent operations, so we don't have to
        # check any preconditions for running them.
        chown ${OCF_RESKEY_username}:${OCF_RESKEY_groupname} "${OCF_RESKEY_erlang_cookie_file}"
        chmod 600 "${OCF_RESKEY_erlang_cookie_file}"
    fi
    return $OCF_SUCCESS
}

# Stop rmq beam process by pid and by rabbit node name match. Returns SUCCESS/ERROR
kill_rmq_and_remove_pid() {
    local LH="${LL} kill_rmq_and_remove_pid():"
    # Stop the rabbitmq-server by its pidfile, use the name matching as a fallback,
    # and ignore the exit code
    proc_stop "${OCF_RESKEY_pid_file}" "beam.*${RABBITMQ_NODENAME}" "${OCF_RESKEY_stop_time}"
    # Ensure the beam.smp stopped by the rabbit node name matching as well
    proc_stop none "beam.*${RABBITMQ_NODENAME}" "${OCF_RESKEY_stop_time}"
    if [ $? -eq 0 ] ; then
        return $OCF_SUCCESS
    else
        return $OCF_ERR_GENERIC
    fi
}

trim_var(){
    local string="$*"
    echo ${string%% }
}

action_validate() {
    # todo(sv): validate some incoming parameters
    OCF_RESKEY_CRM_meta_notify_post=$(trim_var $OCF_RESKEY_CRM_meta_notify_post)
    OCF_RESKEY_CRM_meta_notify_pre=$(trim_var $OCF_RESKEY_CRM_meta_notify_pre)
    OCF_RESKEY_CRM_meta_notify_start=$(trim_var $OCF_RESKEY_CRM_meta_notify_start)
    OCF_RESKEY_CRM_meta_notify_stop=$(trim_var $OCF_RESKEY_CRM_meta_notify_stop)
    OCF_RESKEY_CRM_meta_notify_start_resource=$(trim_var $OCF_RESKEY_CRM_meta_notify_start_resource)
    OCF_RESKEY_CRM_meta_notify_stop_resource=$(trim_var $OCF_RESKEY_CRM_meta_notify_stop_resource)
    OCF_RESKEY_CRM_meta_notify_active_resource=$(trim_var $OCF_RESKEY_CRM_meta_notify_active_resource)
    OCF_RESKEY_CRM_meta_notify_inactive_resource=$(trim_var $OCF_RESKEY_CRM_meta_notify_inactive_resource)
    OCF_RESKEY_CRM_meta_notify_start_uname=$(trim_var $OCF_RESKEY_CRM_meta_notify_start_uname)
    OCF_RESKEY_CRM_meta_notify_stop_uname=$(trim_var $OCF_RESKEY_CRM_meta_notify_stop_uname)
    OCF_RESKEY_CRM_meta_notify_active_uname=$(trim_var $OCF_RESKEY_CRM_meta_notify_active_uname)
    OCF_RESKEY_CRM_meta_notify_master_resource=$(trim_var $OCF_RESKEY_CRM_meta_notify_master_resource)
    OCF_RESKEY_CRM_meta_notify_master_uname=$(trim_var $OCF_RESKEY_CRM_meta_notify_master_uname)
    OCF_RESKEY_CRM_meta_notify_demote_resource=$(trim_var $OCF_RESKEY_CRM_meta_notify_demote_resource)
    OCF_RESKEY_CRM_meta_notify_demote_uname=$(trim_var $OCF_RESKEY_CRM_meta_notify_demote_uname)
    OCF_RESKEY_CRM_meta_notify_slave_resource=$(trim_var $OCF_RESKEY_CRM_meta_notify_slave_resource)
    OCF_RESKEY_CRM_meta_notify_slave_uname=$(trim_var $OCF_RESKEY_CRM_meta_notify_slave_uname)
    OCF_RESKEY_CRM_meta_notify_promote_resource=$(trim_var $OCF_RESKEY_CRM_meta_notify_promote_resource)
    OCF_RESKEY_CRM_meta_notify_promote_uname=$(trim_var $OCF_RESKEY_CRM_meta_notify_promote_uname)
    return $OCF_SUCCESS
}

update_rabbit_start_time_if_rc() {
    local nowtime
    local rc=$1
    if [ $rc -eq 0 ]; then
        nowtime="$(now)"
        ocf_log info "${LH} Rabbit app started successfully. Updating start time attribute with ${nowtime}"
        ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-start-time' --update "${nowtime}"
    fi
}

join_to_cluster() {
    local node="$1"
    local rmq_node
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} join_to_cluster():"

    ocf_log info "${LH} start."

    rmq_node=$(rabbit_node_name $node)
    ocf_log info "${LH} Joining to cluster by node '${rmq_node}'."
    get_status rabbit
    rc=$?
    if [ $rc -eq $OCF_SUCCESS ] ; then
        ocf_log info "${LH} rabbitmq app will be stopped."
        stop_rmq_server_app
        rc=$?
        if [ $rc -ne 0 ] ; then
            ocf_log err "${LH} Can't stop rabbitmq app by stop_app command. Stopping."
            action_stop
            return $OCF_ERR_GENERIC
        fi
    fi
    ocf_log info "${LH} Execute join_cluster with timeout: ${TIMEOUT_ARG}"
    su_rabbit_cmd "${OCF_RESKEY_ctl} join_cluster $rmq_node"
    rc=$?
    if [ $rc -ne 0 ] ; then
        ocf_log err "${LH} Can't join to cluster by node '${rmq_node}'. Stopping."
        action_stop
        return $OCF_ERR_GENERIC
    fi
    sleep 2
    try_to_start_rmq_app
    rc=$?
    if [ $rc -ne 0 ] ; then
        ocf_log err "${LH} Can't start RMQ app after join to cluster. Stopping."
        action_stop
        return $OCF_ERR_GENERIC
    else
        update_rabbit_start_time_if_rc 0
        ocf_log info "${LH} Joined to cluster succesfully."
    fi

    ocf_log info "${LH} end."
    return $rc
}

unjoin_nodes_from_cluster() {
    # node names of the nodes where the pcs resource is being stopped
    local nodelist="$1"
    local hostname
    local nodename
    local rc=$OCF_ERR_GENERIC
    local rnode
    # nodes in rabbit cluster db
    local nodes_in_cluster
    local LH="${LL} unjoin_nodes_from_cluster():"

    nodes_in_cluster=$(get_nodes)
    rc=$?
    if [ $rc -ne 0 ] ; then
        # no nodes in node list, nothing to do
        return $OCF_SUCCESS
    fi

    # unjoin all cluster nodes which are being stopped (i.e. recieved post-stop notify), except *this* node
    # before to unjoin the nodes, make sure they were disconnected from *this* node
    for hostname in $nodelist ; do
        nodename=$(rabbit_node_name $hostname)
        if [ "${nodename}" = "${RABBITMQ_NODENAME}" ] ; then
            continue
        fi
        for rnode in $nodes_in_cluster ; do
            if [ "${nodename}" = "${rnode}" ] ; then
                # disconnect node being unjoined from this node
                ocf_run ${OCF_RESKEY_ctl} eval "disconnect_node(list_to_atom(\"${nodename}\"))." 2>&1
                rc=$?
                if [ $rc -eq $OCF_SUCCESS ] ; then
                    ocf_log info "${LH} node '${nodename}' disconnected succesfully."
                else
                    ocf_log info "${LH} disconnecting node '${nodename}' failed."
                fi

                # unjoin node
                # when the rabbit node went down, its status
                # remains 'running' for a while, so few retries are required
                local tries=0
                until [ $tries -eq 5 ]; do
                    tries=$((tries+1))
                    if is_clustered_with $nodename; then
                        ocf_log info "${LH} the ${nodename} is alive and cannot be kicked from the cluster yet"
                    else
                        break
                    fi
                    sleep 10
                done
                ocf_log info "${LH} Execute forget_cluster_node with timeout: ${TIMEOUT_ARG}"
                su_rabbit_cmd "${OCF_RESKEY_ctl} forget_cluster_node ${nodename}"
                rc=$?
                if [ $rc -eq 0 ] ; then
                   ocf_log info "${LH} node '${nodename}' unjoined succesfully."
                else
                   ocf_log warn "${LH} unjoining node '${nodename}' failed."
                fi
            fi
        done
    done
    return $OCF_SUCCESS
}

# Stop RMQ beam server process. Returns SUCCESS/ERROR
stop_server_process() {
    local pid
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} stop_server_process():"

    pid=$(cat ${OCF_RESKEY_pid_file})
    rc=$?
    if [ $rc -ne 0 ] ; then
        # Try to stop without known PID
        ocf_log err "${LH} RMQ-server process PIDFILE was not found!"
        su_rabbit_cmd "${OCF_RESKEY_ctl} stop 2>&1 >> \"${OCF_RESKEY_log_dir}/shutdown_log\""
        if [ $? -eq 0 ] ; then
            ocf_log info "${LH} RMQ-server process stopped succesfully, although there was no PIDFILE found."
            ocf_log info "${LH} grant a graceful termintation window ${OCF_RESKEY_stop_time} to end its beam"
            sleep "${OCF_RESKEY_stop_time}"
        else
            kill_rmq_and_remove_pid
        fi
    elif [ "${pid}" ] ; then
        # Try to stop gracefully by known PID
        ocf_log info "${LH} Execute stop with timeout: ${TIMEOUT_ARG}"
        su_rabbit_cmd "${OCF_RESKEY_ctl} stop ${OCF_RESKEY_pid_file} 2>&1 >> \"${OCF_RESKEY_log_dir}/shutdown_log\""
        [ $? -eq 0 ] && ocf_log info "${LH} RMQ-server process (PID=${pid}) stopped succesfully."
    fi

    # Ensure there is no beam process and pidfile left
    pgrep -f "beam.*${RABBITMQ_NODENAME}" > /dev/null
    rc=$?
    if [ -f ${OCF_RESKEY_pid_file} -o $rc -eq 0 ] ; then
        ocf_log warn "${LH} The pidfile or beam's still exist, forcing the RMQ-server cleanup"
        kill_rmq_and_remove_pid
    fi

    # Return the actual status
    get_status
    if [ $? -ne 0 ] ; then
        return $OCF_SUCCESS
    else
        return $OCF_ERR_GENERIC
    fi
}

# Stop RMQ-app. Return OCF_SUCCESS, if the app was stopped,
# otherwise return OCF_ERR_GENERIC
stop_rmq_server_app() {
    local rc=$OCF_ERR_GENERIC

    # if the beam process isn't running, then rabbit app is stopped as well
    get_status
    rc=$?
    if [ $rc -ne 0 ] ; then
        return $OCF_SUCCESS
    fi

    # stop the app
    ocf_log info "${LH} Execute stop_app with timeout: ${TIMEOUT_ARG}"
    su_rabbit_cmd "${OCF_RESKEY_ctl} stop_app 2>&1 >> \"${OCF_RESKEY_log_dir}/shutdown_log\""
    rc=$?
    if [ $rc -ne 0 ] ; then
         ocf_log err "${LH} RMQ-server app cannot be stopped."
         return $OCF_ERR_GENERIC
    fi

    get_status rabbit
    rc=$?
    if [ $rc -ne $OCF_SUCCESS ] ; then
        ocf_log info "${LH} RMQ-server app stopped succesfully."
        rc=$OCF_SUCCESS
    else
        ocf_log err "${LH} RMQ-server app cannot be stopped."
        rc=$OCF_ERR_GENERIC
    fi

    return $rc
}

start_beam_process() {
    local command
    local rc=$OCF_ERR_GENERIC
    local ts_end
    local pf_end
    local pid
    local LH="${LL} start_beam_process():"

    # remove old PID-file if it exists
    if [ -f "${OCF_RESKEY_pid_file}" ] ; then
        ocf_log warn "${LH} found old PID-file '${OCF_RESKEY_pid_file}'."
        pid=$(cat ${OCF_RESKEY_pid_file})
        if [ "${pid}" -a -d "/proc/${pid}" ] ; then
            ocf_run cat /proc/${pid}/cmdline | grep -c 'bin/beam' 2>&1 > /dev/null
            rc=$?
            if [ $rc -eq $OCF_SUCCESS ] ; then
                ocf_log warn "${LH} found beam process with PID=${pid}, killing...'."
                ocf_run kill -TERM $pid
            else
                ocf_log err "${LH} found unknown process with PID=${pid} from '${OCF_RESKEY_pid_file}'."
                return $OCF_ERR_GENERIC
            fi
        fi
        ocf_run rm -f $OCF_RESKEY_pid_file
    fi

    [ -f /etc/default/rabbitmq-server ] && . /etc/default/rabbitmq-server

    # run beam process
    command="${OCF_RESKEY_binary} >> \"${OCF_RESKEY_log_dir}/startup_log\" 2>/dev/null"
    RABBITMQ_NODE_ONLY=1 su rabbitmq -s /bin/sh -c "${command}"&
    ts_end=$(( $(now) + ${OCF_RESKEY_start_time} ))
    rc=$OCF_ERR_GENERIC
    while [ $(now) -lt ${ts_end} ]; do
        # waiting for normal start of beam
        pid=0
        pf_end=$(( $(now) + 3 ))
        while [ $(now) -lt ${pf_end} ]; do
            # waiting for OCF_RESKEY_pid_file of beam process
            if [ -f "${OCF_RESKEY_pid_file}" ] ; then
                pid=$(cat ${OCF_RESKEY_pid_file})
                break
            fi
            sleep 1
        done
        if [ "${pid}" != "0" -a -d "/proc/${pid}" ] ; then
            rc=$OCF_SUCCESS
            break
        fi
        sleep 2
    done
    if [ $rc -ne $OCF_SUCCESS ]; then
        if [ "${pid}" = "0" ] ; then
            ocf_log warn "${LH} PID-file '${OCF_RESKEY_pid_file}' not found"
        fi
        ocf_log err "${LH} RMQ-runtime (beam) didn't start succesfully (rc=${rc})."
    fi

    return $rc
}

check_plugins() {
  # Check if it's safe to load plugins and if we need to do so. Logic is:
  #   if (EnabledPlugins > 0) and (ActivePlugins == 0) ; then it's safe to load
  # If we have at least one active plugin, then it's not safe to re-load them
  # because plugins:setup() would remove existing dependency plugins in plugins_expand_dir.
  ${OCF_RESKEY_ctl} eval '{ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file), EnabledPlugins = rabbit_plugins:read_enabled(EnabledFile), ActivePlugins = rabbit_plugins:active(), if length(EnabledPlugins)>0 -> if length(ActivePlugins)==0 -> erlang:error("need_to_load_plugins"); true -> false end; true -> false end.'
  return $?
}

load_plugins() {
  check_plugins
  local rc=$?
  if [ $rc -eq 0 ] ; then
    return 0
  else
    ${OCF_RESKEY_ctl} eval 'ToBeLoaded = rabbit_plugins:setup(), ok = app_utils:load_applications(ToBeLoaded), StartupApps = app_utils:app_dependency_order(ToBeLoaded,false), app_utils:start_applications(StartupApps).'
    return $?
  fi
}

list_active_plugins() {
  local list
  list=`${OCF_RESKEY_ctl} eval 'rabbit_plugins:active().'`
  echo "${list}"
}

try_to_start_rmq_app() {
    local startup_log="${1:-${OCF_RESKEY_log_dir}/startup_log}"
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} try_to_start_rmq_app():"

    get_status
    rc=$?
    if [ $rc -ne $OCF_SUCCESS ] ; then
        ocf_log info "${LH} RMQ-runtime (beam) not started, starting..."
        start_beam_process
        rc=$?
        if [ $rc -ne $OCF_SUCCESS ]; then
            ocf_log err "${LH} Failed to start beam - returning from the function"
            return $OCF_ERR_GENERIC
        fi
    fi


    if [ -z "${startup_log}" ] ; then
        startup_log="${OCF_RESKEY_log_dir}/startup_log"
    fi

    ocf_log info "${LH} begin."
    ocf_log info "${LH} Execute start_app with timeout: ${TIMEOUT_ARG}"
    su_rabbit_cmd "${OCF_RESKEY_ctl} start_app >>${startup_log} 2>&1"
    rc=$?
    if [ $rc -eq 0 ] ; then
        ocf_log info "${LH} start_app was successful."
        ocf_log info "${LH} waiting for start to finish with timeout: ${TIMEOUT_ARG}"
        su_rabbit_cmd "${OCF_RESKEY_ctl} wait ${OCF_RESKEY_pid_file}"
        rc=$?
        if [ $rc -ne 0 ] ; then
             ocf_log err "${LH} RMQ-server app failed to wait for start."
             return $OCF_ERR_GENERIC
        fi
        rc=$OCF_SUCCESS
        # Loading enabled modules
        ocf_log info "${LH} start plugins."
        load_plugins
        local mrc=$?
        if [ $mrc -eq 0 ] ; then
          local mlist
          mlist=`list_active_plugins`
          ocf_log info "${LH} Starting plugins: ${mlist}"
        else
          ocf_log info "${LH} Starting plugins: failed."
        fi
    else
        ocf_log info "${LH} start_app failed."
        rc=$OCF_ERR_GENERIC
    fi
    return $rc
}

start_rmq_server_app() {
    local rc=$OCF_ERR_GENERIC
    local startup_log="${OCF_RESKEY_log_dir}/startup_log"
    local startup_output
    local LH="${LL} start_rmq_server_app():"
    local a

    #We are performing initial start check.
    #We are not ready to provide service.
    #Clients should not have access.


    ocf_log info "${LH} begin."
    # Safe-unblock the rules, if there are any
    unblock_client_access
    # Apply the blocking rule
    block_client_access
    rc=$?
    if [ $rc -eq $OCF_SUCCESS ]; then
      ocf_log info "${LH} blocked access to RMQ port"
    else
      ocf_log err "${LH} cannot block access to RMQ port!"
      return $OCF_ERR_GENERIC
    fi
    get_status
    rc=$?
    if [ $rc -ne $OCF_SUCCESS ] ; then
        ocf_log info "${LH} RMQ-runtime (beam) not started, starting..."
        start_beam_process
        rc=$?
        if [ $rc -ne $OCF_SUCCESS ]; then
            unblock_client_access
            ocf_log info "${LH} unblocked access to RMQ port"
            return $OCF_ERR_GENERIC
        fi
    fi

    ocf_log info "${LH} RMQ-server app not started, starting..."
    try_to_start_rmq_app "$startup_log"
    rc=$?
    if [ $rc -eq $OCF_SUCCESS ] ; then
        # rabbitmq-server started successfuly as master of cluster
        master_score $MIN_MASTER_SCORE
        stop_rmq_server_app
        rc=$?
        if [ $rc -ne 0 ] ; then
            ocf_log err "${LH} RMQ-server app can't be stopped. Beam will be killed."
            kill_rmq_and_remove_pid
            unblock_client_access
            ocf_log info "${LH} unblocked access to RMQ port"
            return $OCF_ERR_GENERIC
        fi
    else
       # error at start RMQ-server
       ocf_log warn "${LH} RMQ-server app can't start without Mnesia cleaning."
       for a in $(seq 1 10) ; do
            rc=$OCF_ERR_GENERIC
            reset_mnesia || break
            try_to_start_rmq_app "$startup_log"
            rc=$?
            if [ $rc -eq $OCF_SUCCESS ]; then
                stop_rmq_server_app
                rc=$?
                if [ $rc -eq $OCF_SUCCESS ]; then
                    ocf_log info "${LH} RMQ-server app Mnesia cleaned successfully."
                    rc=$OCF_SUCCESS
                    master_score $MIN_MASTER_SCORE
                    break
                else
                    ocf_log err "${LH} RMQ-server app can't be stopped during Mnesia cleaning. Beam will be killed."
                    kill_rmq_and_remove_pid
                    unblock_client_access
                    ocf_log info "${LH} unblocked access to RMQ port"
                    return $OCF_ERR_GENERIC
                fi
            fi
        done
    fi
    if [ $rc -eq $OCF_ERR_GENERIC ] ; then
         ocf_log err "${LH} RMQ-server can't be started while many tries. Beam will be killed."
         kill_rmq_and_remove_pid
    fi
    ocf_log info "${LH} end."
    unblock_client_access
    ocf_log info "${LH} unblocked access to RMQ port"
    return $rc
}

# check status of rabbit beam process or a rabbit app, if rabbit arg specified
# by default, test if the kernel app is running, otherwise consider it is "not running"
get_status() {
    local what="${1:-kernel}"
    local rc=$OCF_NOT_RUNNING
    local LH="${LL} get_status():"
    local body
    local beam_running

    body=$( ${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} eval 'rabbit_misc:which_applications().' 2>&1 )
    rc=$?

    pgrep -f "beam.*${RABBITMQ_NODENAME}" > /dev/null
    beam_running=$?
    # report not running only if the which_applications() reported an error AND the beam is not running
    if [ $rc -ne 0 -a $beam_running -ne 0 ] ; then
        ocf_log info "${LH} failed with code ${rc}. Command output: ${body}"
        return $OCF_NOT_RUNNING
    # return a generic error, if there were errors and beam is found running
    elif [ $rc -ne 0 ] ; then
        ocf_log info "${LH} found the beam process running but failed with code ${rc}. Command output: ${body}"
        return $OCF_ERR_GENERIC
    fi

    # try to parse the which_applications() output only if it exited w/o errors
    if [ "${what}" -a $rc -eq 0 ] ; then
        rc=$OCF_NOT_RUNNING
        echo "$body" | grep "\{${what}," 2>&1 > /dev/null && rc=$OCF_SUCCESS

        if [ $rc -ne $OCF_SUCCESS ] ; then
            ocf_log info "${LH} app ${what} was not found in command output: ${body}"
        fi
    fi

    [ $rc -ne $OCF_SUCCESS ] && rc=$OCF_NOT_RUNNING
    return $rc
}

action_status() {
    local rc=$OCF_ERR_GENERIC

    get_status
    rc=$?
    return $rc
}

# return 0, if given node has a master attribute in CIB,
# otherwise, return 1
is_master() {
    local result
    result=`crm_attribute -N "${1}" -l reboot --name 'rabbit-master' --query 2>/dev/null |\
           awk '{print $3}' | awk -F "=" '{print $2}' | sed -e '/(null)/d'`
    if [ "${result}" != 'true' ] ; then
        return 1
    fi
    return 0
}

# Verify if su_rabbit_cmd exited by timeout by checking its return code.
# If it did not, return 0. If it did AND it is
# $OCF_RESKEY_max_rabbitmqctl_timeouts'th timeout in a row,
# return 2 to signal get_monitor that it should
# exit with error. Otherwise return 1 to signal that there was a timeout,
# but it should be ignored. Timeouts for different operations are tracked
# separately. The second argument is used to distingush them.
check_timeouts() {
    local op_rc=$1
    local timeouts_attr_name=$2
    local op_name=$3

    if [ $op_rc -ne 124 -a $op_rc -ne 137 ]; then
        ocf_update_private_attr $timeouts_attr_name 0
        return 0
    fi

    local count
    count=$(ocf_get_private_attr $timeouts_attr_name 0)

    count=$((count+1))
    # There is a slight chance that this piece of code will be executed twice simultaneously.
    # As a result, $timeouts_attr_name's value will be one less than it should be. But we don't need
    # precise calculation here.
    ocf_update_private_attr $timeouts_attr_name $count

    if [ $count -lt $OCF_RESKEY_max_rabbitmqctl_timeouts ]; then
        ocf_log warn "${LH} 'rabbitmqctl $op_name' timed out $count of max. $OCF_RESKEY_max_rabbitmqctl_timeouts time(s) in a row. Doing nothing for now."
        return 1
    else
        ocf_log err "${LH} 'rabbitmqctl $op_name' timed out $count of max. $OCF_RESKEY_max_rabbitmqctl_timeouts time(s) in a row and is not responding. The resource is failed."
        return 2
    fi
}

wait_sync() {
  wait_time=$1

  queues="${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} list_queues name state"
  su_rabbit_cmd -t "${wait_time}" "sh -c \"while ${queues} | grep -q 'syncing,'; \
      do sleep 2; done\""
  return $?
}

get_monitor() {
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} get_monitor():"
    local status_master=1
    local rabbit_running
    local name
    local node
    local node_start_time
    local nowtime

    ocf_log info "${LH} CHECK LEVEL IS: ${OCF_CHECK_LEVEL}"
    get_status
    rc=$?
    if [ $rc -eq $OCF_NOT_RUNNING ] ; then
        ocf_log info "${LH} get_status() returns ${rc}."
        ocf_log info "${LH} ensuring this slave does not get promoted."
        master_score 0
        return $OCF_NOT_RUNNING
    elif [ $rc -eq $OCF_SUCCESS ] ; then
        ocf_log info "${LH} get_status() returns ${rc}."
        ocf_log info "${LH} also checking if we are master."
        get_status rabbit
        rabbit_running=$?
        is_master $THIS_PCMK_NODE
        status_master=$?
        ocf_log info "${LH} master attribute is ${status_master}"
        if [ $status_master -eq 0 -a $rabbit_running -eq $OCF_SUCCESS ]
        then
            ocf_log info "${LH} We are the running master"
            rc=$OCF_RUNNING_MASTER
        elif [ $status_master -eq 0 -a $rabbit_running -ne $OCF_SUCCESS ] ; then
            ocf_log err "${LH} We are the master and RMQ-runtime (beam) is not running. this is a failure"
            exit $OCF_FAILED_MASTER
        fi
    fi
    get_status rabbit
    rabbit_running=$?
    ocf_log info "${LH} checking if rabbit app is running"

    if [ $rc -eq $OCF_RUNNING_MASTER ]; then
        if [ $rabbit_running -eq $OCF_SUCCESS ]; then
            ocf_log info "${LH} rabbit app is running and is master of cluster"
        else
            ocf_log err "${LH} we are the master and rabbit app is not running. This is a failure"
            exit $OCF_FAILED_MASTER
        fi
    else
        start_time=$((180 + $(ocf_get_private_attr 'rabbit-start-phase-1-time' 0)))
        restart_order_time=$((60 + $(ocf_get_private_attr 'rabbit-ordered-to-restart' 0)))
        nowtime=$(now)

        # If we started more than 3 minutes ago, and
        # we got order to restart less than 1 minute ago
        if [ $nowtime -lt $restart_order_time ]; then
            if [ $nowtime -gt $start_time ]; then
                ocf_log err "${LH} failing because we have received an order to restart from the master"
                stop_server_process
                rc=$OCF_ERR_GENERIC
            else
                ocf_log warn "${LH} received an order to restart from the master, ignoring it because we have just started"
            fi
        fi
    fi

    if [ $rc -eq $OCF_ERR_GENERIC ]; then
        ocf_log err "${LH} get_status() returns generic error ${rc}"
        ocf_log info "${LH} ensuring this slave does not get promoted."
        master_score 0
        return $OCF_ERR_GENERIC
    fi

    # Recounting our master score
    ocf_log info "${LH} preparing to update master score for node"
    local our_start_time
    local new_score
    local node_start_time
    local node_score

    our_start_time=$(get_node_start_time $THIS_PCMK_NODE)

    if [ $our_start_time -eq 0 ]; then
        new_score=$MIN_MASTER_SCORE
    else
        new_score=$BEST_MASTER_SCORE
        for node in $(get_alive_pacemaker_nodes_but $THIS_PCMK_NODE)
        do
            node_start_time=$(get_node_start_time $node)
            node_score=$(get_node_master_score $node)

            ocf_log info "${LH} comparing us (start time: $our_start_time, score: $new_score) with $node (start time: $node_start_time, score: $node_score)"
            if [ $node_start_time -ne 0 -a $node_score -ne 0 -a $node_start_time -lt $our_start_time ]; then
                new_score=$((node_score - 10 < new_score ? node_score - 10 : new_score ))
            fi
        done
    fi

    if [ "$new_score" -ne "$(get_node_master_score $THIS_PCMK_NODE)" ]; then
        master_score $new_score
    fi
    ocf_log info "${LH} our start time is $our_start_time and score is $new_score"

    # Skip all other checks if rabbit app is not running
    if [ $rabbit_running -ne $OCF_SUCCESS ]; then
        ocf_log info "${LH} RabbitMQ is not running, get_monitor function ready to return ${rc}"
        return $rc
    fi

    # Check if the rabbitmqctl control plane is alive.
    local rc_alive
    local timeout_alive
    su_rabbit_cmd "${OCF_RESKEY_ctl} list_channels 2>&1 > /dev/null"
    rc_alive=$?
    [ $rc_alive -eq 137 -o $rc_alive -eq 124 ] && ocf_log err "${LH} 'rabbitmqctl list_channels' timed out, per-node explanation: $(enhanced_list_channels)"
    check_timeouts $rc_alive "rabbit_list_channels_timeouts" "list_channels"
    timeout_alive=$?

    if [ $timeout_alive -eq 2 ]; then
        master_score 0
        return $OCF_ERR_GENERIC
    elif [ $timeout_alive -eq 0 ]; then
        if [ $rc_alive -ne 0 ]; then
            ocf_log err "${LH} rabbitmqctl list_channels exited with errors."
            rc=$OCF_ERR_GENERIC
        fi
    fi

    # Check for memory alarms for this Master or Slave node.
    # If alert found, reset the alarm
    # and restart the resource as it likely means a dead end situation
    # when rabbitmq cluster is running with blocked publishing due
    # to high memory watermark exceeded.
    local alarms
    local rc_alarms
    local timeout_alarms
    alarms=`su_rabbit_cmd "${OCF_RESKEY_ctl} -q eval 'rabbit_alarm:get_alarms().'"`
    rc_alarms=$?
    check_timeouts $rc_alarms "rabbit_get_alarms_timeouts" "get_alarms"
    timeout_alarms=$?

    if [ $timeout_alarms -eq 2 ]; then
        master_score 0
        return $OCF_ERR_GENERIC

    elif [ $timeout_alarms -eq 0 ]; then
        if [ $rc_alarms -ne 0 ]; then
            ocf_log err "${LH} rabbitmqctl get_alarms exited with errors."
            rc=$OCF_ERR_GENERIC

        elif [ -n "${alarms}" ]; then
            for node in "${alarms}"; do
                name=`echo ${node} | perl -n -e "m/memory,'(?<n>\S+)+'/ && print \"$+{n}\n\""`
                if [ "${name}" = "${RABBITMQ_NODENAME}" ] ; then
                    ocf_log err "${LH} Found raised memory alarm. Erasing the alarm and restarting."
                    su_rabbit_cmd "${OCF_RESKEY_ctl} set_vm_memory_high_watermark 10 2>&1 > /dev/null"
                    rc=$OCF_ERR_GENERIC
                    break
                fi
            done
        fi
    fi

    if ! is_cluster_status_ok ; then
        rc=$OCF_ERR_GENERIC
    fi

    # Check if the list of all queues is available,
    # Also report some queues stats and total virtual memory.
    local queues
    local rc_queues
    local timeout_queues
    queues=`su_rabbit_cmd "${OCF_RESKEY_ctl} -q list_queues memory messages consumer_utilisation"`
    rc_queues=$?
    check_timeouts $rc_queues "rabbit_list_queues_timeouts" "list_queues"
    timeout_queues=$?

    if [ $timeout_queues -eq 2 ]; then
        master_score 0
        return $OCF_ERR_GENERIC

    elif [ $timeout_queues -eq 0 ]; then
        if [ $rc_queues -ne 0 ]; then
            ocf_log err "${LH} rabbitmqctl list_queues exited with errors."
            rc=$OCF_ERR_GENERIC

        elif [ -n "${queues}" ]; then
            local q_c
            q_c=`printf %b "${queues}\n" | wc -l`
            local mem
            mem=`printf %b "${queues}\n" | awk -v sum=0 '{sum+=$1} END {print (sum/1048576)}'`
            local mes
            mes=`printf %b "${queues}\n" | awk -v sum=0 '{sum+=$2} END {print sum}'`
            local c_u
            c_u=`printf %b "${queues}\n" | awk -v sum=0 -v cnt=${q_c} '{sum+=$3} END {print (sum+1)/(cnt+1)}'`
            local status
            status=`echo $(su_rabbit_cmd "${OCF_RESKEY_ctl} -q status")`
            ocf_log info "${LH} RabbitMQ is running ${q_c} queues consuming ${mem}m of ${TOTALVMEM}m total, with ${mes} queued messages, average consumer utilization ${c_u}"
            ocf_log info "${LH} RabbitMQ status: ${status}"
        fi
    fi

    # If we are the master and healthy, check that we see other cluster members
    # Order a member to restart if we don't see it
    if [ $rc -eq $OCF_RUNNING_MASTER ] ; then
        for node in $(get_all_pacemaker_nodes); do
            if ! is_clustered_with $node; then
                nowtime=$(now)

                ocf_log warn "${LH} node $node is not connected with us, ordering it to restart."
                ocf_update_private_attr 'rabbit-ordered-to-restart' "$nowtime" "$node"
            fi
        done
    fi

    ocf_log info "${LH} get_monitor function ready to return ${rc}"
    return $rc
}

ocf_get_private_attr() {
    local attr_name="${1:?}"
    local attr_default_value="${2:?}"
    local nodename="${3:-$THIS_PCMK_NODE}"
    local count
    count=$(attrd_updater -p --name "$attr_name" --node "$nodename" --query)
    if [ $? -ne 0 ]; then
        echo $attr_default_value
    else
        echo "$count" | awk -vdef_val="$attr_default_value" '{ gsub(/"/, "", $3); split($3, vals, "="); if (vals[2] != "") print vals[2]; else print def_val }'
    fi
}

ocf_update_private_attr() {
    local attr_name="${1:?}"
    local attr_value="${2:?}"
    local nodename="${3:-$THIS_PCMK_NODE}"
    ocf_run attrd_updater -p --name "$attr_name" --node "$nodename" --update "$attr_value"
}

rabbitmqctl_with_timeout_check() {
    local command="${1:?}"
    local timeout_attr_name="${2:?}"

    su_rabbit_cmd "${OCF_RESKEY_ctl} $command"
    local rc=$?

    check_timeouts $rc $timeout_attr_name "$command"
    local has_timed_out=$?

    case "$has_timed_out" in
        0)
            return $rc;;
        1)
            return 0;;
        2)
            return 1;;
    esac
}

is_cluster_status_ok() {
    local LH="${LH}: is_cluster_status_ok:"
    rabbitmqctl_with_timeout_check cluster_status rabbit_cluster_status_timeouts > /dev/null 2>&1
}

action_monitor() {
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} monitor:"
    ocf_log debug "${LH} action start."
    if [ "${OCF_RESKEY_debug}" = 'true' ] ; then
        d=`date '+%Y%m%d %H:%M:%S'`
        echo $d >> /tmp/rmq-monitor.log
        env >> /tmp/rmq-monitor.log
        echo "$d  [monitor] start='${OCF_RESKEY_CRM_meta_notify_start_uname}' stop='${OCF_RESKEY_CRM_meta_notify_stop_uname}' active='${OCF_RESKEY_CRM_meta_notify_active_uname}' inactive='${OCF_RESKEY_CRM_meta_notify_inactive_uname}'" >> /tmp/rmq-ocf.log
    fi
    get_monitor
    rc=$?
    ocf_log debug "${LH} role: ${OCF_RESKEY_CRM_meta_role}"
    ocf_log debug "${LH} result: $rc"
    ocf_log debug "${LH} action end."
    return $rc
}


action_start() {
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} start:"
    local nowtime

    if [ "${OCF_RESKEY_debug}" = 'true' ] ; then
        d=`date '+%Y%m%d %H:%M:%S'`
        echo $d >> /tmp/rmq-start.log
        env >> /tmp/rmq-start.log
        echo "$d  [start]  start='${OCF_RESKEY_CRM_meta_notify_start_uname}' stop='${OCF_RESKEY_CRM_meta_notify_stop_uname}' active='${OCF_RESKEY_CRM_meta_notify_active_uname}' inactive='${OCF_RESKEY_CRM_meta_notify_inactive_uname}'" >> /tmp/rmq-ocf.log
    fi

    ocf_log info "${LH} action begin."

    get_status
    rc=$?
    if [ $rc -eq $OCF_SUCCESS ] ; then
        ocf_log warn "${LH} RMQ-runtime (beam) already started."
        return $OCF_SUCCESS
    fi

    local attrs_to_zero="rabbit_list_channels_timeouts rabbit_get_alarms_timeouts rabbit_list_queues_timeouts rabbit_cluster_status_timeouts"
    local attr_name_to_reset
    for attr_name_to_reset in $attrs_to_zero; do
        ocf_update_private_attr $attr_name_to_reset 0
    done

    nowtime=$(now)
    ocf_log info "${LH} Setting phase 1 one start time to $nowtime"
    ocf_update_private_attr 'rabbit-start-phase-1-time' "$nowtime"
    ocf_log info "${LH} Deleting start time attribute"
    ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-start-time' --delete
    ocf_log info "${LH} Deleting master attribute"
    ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-master' --delete

    ocf_log info "${LH} RMQ going to start."
    start_rmq_server_app
    rc=$?
    if [ $rc -eq $OCF_SUCCESS ] ; then
        ocf_log info "${LH} RMQ prepared for start succesfully."
    fi

    ocf_log info "${LH} action end."
    return $rc
}


action_stop() {
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} stop:"

    if [ "${OCF_RESKEY_debug}" = 'true' ] ; then
        d=$(date '+%Y%m%d %H:%M:%S')
        echo $d >> /tmp/rmq-stop.log
        env >> /tmp/rmq-stop.log
        echo "$d  [stop]  start='${OCF_RESKEY_CRM_meta_notify_start_uname}' stop='${OCF_RESKEY_CRM_meta_notify_stop_uname}' active='${OCF_RESKEY_CRM_meta_notify_active_uname}' inactive='${OCF_RESKEY_CRM_meta_notify_inactive_uname}'" >> /tmp/rmq-ocf.log
    fi

    ocf_log info "${LH} action begin."

    ocf_log info "${LH} Deleting master attribute"
    ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-master' --delete
    master_score 0
    ocf_log info "${LH} Deleting start time attribute"
    ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-start-time' --delete

    # Wait for synced state first
    ocf_log info "${LH} waiting $((OCF_RESKEY_stop_time/2)) to sync"
    wait_sync $((OCF_RESKEY_stop_time/2))

    ocf_log info "${LH} RMQ-runtime (beam) going to down."
    stop_server_process
    # Fail early without additional rabbitmqctl invocations
    if [ $? -ne $OCF_SUCCESS ] ; then
        ocf_log err "RMQ-runtime (beam) couldn't be stopped and will likely became unmanaged. Take care of it manually!"
        ocf_log info "${LH} action end."
        exit $OCF_ERR_GENERIC
    fi

    # Ensure the actual status to be returned
    get_status
    if [ $? -eq $OCF_NOT_RUNNING ] ; then
        ocf_log info "${LH} RMQ-runtime (beam) not running."
        ocf_log info "${LH} action end."
        return $OCF_SUCCESS
    else
        ocf_log err "RMQ-runtime (beam) couldn't be stopped and will likely became unmanaged. Take care of it manually!"
        ocf_log info "${LH} action end."
        exit $OCF_ERR_GENERIC
    fi

}

#######################################################################
# Enhanced list_channels:
# - nodes are processed in parallel
# - report contains information about which nodes timed out
#
# 'list_channels' is used as a healh-check for current node, but it
# actually checks overall health of all node in cluster. And there were
# some bugs where only one (non-local) channel became stuck, but OCF
# script was wrongfully killing local node.
#
# Hopefully all such bugs are fixed, but if not - it will allow to
# detect such conditions.
#
# Somewhat strange implementation is due to the following reasons:
# - ability to support older versions of RabbitMQ which have reached
#   end-of-life with single version of the script
# - zero dependencies - for older versions this functionality could be
#   implemented as a plugin, but it'll require this plugin installation
enhanced_list_channels() {
    # One second less than timeout of su_rabbit_cmd
    local timeout=$((${TIMEOUT_ARG:-5} - 1))

    su_rabbit_cmd "xargs -0 ${OCF_RESKEY_ctl} eval" <<EOF
SecondsToCompletion = $timeout,

%% Milliseconds since unix epoch
Now = fun() ->
              {Mega, Secs, Micro} = os:timestamp(),
              Mili = Micro div 1000,
              Mili + 1000 * (Secs + 1000000 * Mega)
      end,

%% We shouldn't continue execution past this time
ShouldEndAt = Now() + SecondsToCompletion * 1000,

%% How many milliseconds we still have
Timeout = fun() ->
                  case ShouldEndAt - Now() of
                      Past when Past =< 0 ->
                          0;
                      Timeout ->
                          Timeout
                  end
          end,

%% Lambda combinator - for defining anonymous recursive functions
Y = fun(F) ->
            (fun (X) -> F(fun(Y) -> (X(X))(Y) end) end)(
              fun (X) -> F(fun(Y) -> (X(X))(Y) end) end)
    end,

Parent = self(),

ListChannels = Y(fun(Rec) ->
                         fun (({Node, [], OkChannelsCount})) ->
                                 Parent ! {Node, ok, OkChannelsCount};
                             ({Node, [Chan|Rest], OkChannelsCount}) ->
                                 case catch rpc:call(Node, rabbit_channel, info, [Chan], Timeout()) of
                                     Infos when is_list(Infos) ->
                                         Rec({Node, Rest, OkChannelsCount + 1});
                                     {badrpc, {'EXIT', {noproc, _}}} ->
                                         %% Channel became dead before we could request it's status, don't care
                                         Rec({Node, Rest, OkChannelsCount});
                                     Err ->
                                         Parent ! {Node, Err, OkChannelsCount}
                                 end
                         end
                 end),

SingleNodeListing = fun(Node) ->
                            case catch rpc:call(Node, pg_local, get_members, [rabbit_channels], Timeout()) of
                                LocalChannels when is_list(LocalChannels) ->
                                    ListChannels({Node, LocalChannels, 0});
                                Err ->
                                    Parent ! {Node, Err, 0}
                            end
                    end,

AllNodes = rabbit_mnesia:cluster_nodes(running),
[ spawn(fun() -> SingleNodeListing(Node) end) || Node <- AllNodes ],

WaitForNodes = Y(fun(Rec) ->
                  fun ({[], Acc}) ->
                          Acc;
                      ({RemainingNodes, Acc}) ->
                          receive
                              {Node, _Status, _ChannelCount} = Smth ->
                                  RemainingNodes1 = lists:delete(Node, RemainingNodes),
                                  Rec({RemainingNodes1, [Smth|Acc]})
                              after Timeout() + 100 ->
                                      Acc
                              end
                  end
          end),

Result = WaitForNodes({AllNodes, []}),

ExpandedResult = [ case lists:keysearch(Node, 1, Result) of
                       {value, NodeResult} ->
                           NodeResult;
                       false ->
                           {Node, no_data_collected, 0}
                   end || Node <- AllNodes ],

ExpandedResult.
EOF
}

#######################################################################
# Join the cluster and return OCF_SUCCESS, if joined.
# Return 10, if node is trying to join to itself or empty destination.
# Return OCF_ERR_GENERIC, if cannot join.
jjj_join () {
    local join_to="$1"
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} jjj_join:"

    my_host ${join_to}
    rc=$?
    ocf_log debug "${LH} node='${join_to}' rc='${rc}'"

    # Check whether we are joining to ourselves
    # or master host is not given
    if [ $rc -ne 0 -a "${join_to}" ] ; then
            ocf_log info "${LH} Joining to cluster by node '${join_to}'"
            join_to_cluster "${join_to}"
            rc=$?
            if [ $rc -ne $OCF_SUCCESS ] ; then
                ocf_log err "${LH} Failed to join the cluster. The mnesia will be reset."
                reset_mnesia
                rc=$OCF_ERR_GENERIC
            fi
    fi
    return $rc
}

action_notify() {
    local rc_join=$OCF_SUCCESS
    local rc=$OCF_ERR_GENERIC
    local rc2=$OCF_ERR_GENERIC
    local LH="${LL} notify:"
    local nodelist

    if [ "${OCF_RESKEY_debug}" = 'true' ] ; then
        d=`date '+%Y%m%d %H:%M:%S'`
        echo $d >> /tmp/rmq-notify.log
        env >> /tmp/rmq-notify.log
        echo "$d  [notify]  ${OCF_RESKEY_CRM_meta_notify_type}-${OCF_RESKEY_CRM_meta_notify_operation} promote='${OCF_RESKEY_CRM_meta_notify_promote_uname}' demote='${OCF_RESKEY_CRM_meta_notify_demote_uname}' master='${OCF_RESKEY_CRM_meta_notify_master_uname}' slave='${OCF_RESKEY_CRM_meta_notify_slave_uname}' start='${OCF_RESKEY_CRM_meta_notify_start_uname}' stop='${OCF_RESKEY_CRM_meta_notify_stop_uname}' active='${OCF_RESKEY_CRM_meta_notify_active_uname}' inactive='${OCF_RESKEY_CRM_meta_notify_inactive_uname}'" >> /tmp/rmq-ocf.log
    fi

    if [ "${OCF_RESKEY_CRM_meta_notify_type}" = 'post' ] ; then
        # POST- anything notify section
        case "$OCF_RESKEY_CRM_meta_notify_operation" in
            promote)
                ocf_log info "${LH} post-promote begin."

                rc=$OCF_SUCCESS

                # Do nothing, if the list of nodes being promoted reported empty.
                # Delegate recovery, if needed, to the "running out of the cluster" monitor's logic
                if [ -z "${OCF_RESKEY_CRM_meta_notify_promote_uname}" ] ; then
                    ocf_log warn "${LH} there are no nodes to join to reported on post-promote. Nothing to do."

                elif my_host "${OCF_RESKEY_CRM_meta_notify_promote_uname}"; then
                    ocf_log info "${LH} ignoring post-promote of self"

                elif is_clustered_with "${OCF_RESKEY_CRM_meta_notify_promote_uname}"; then
                    if get_status rabbit; then
                        ocf_log info "${LH} we are already clustered with master - ${OCF_RESKEY_CRM_meta_notify_promote_uname}. Nothing to do."
                    else
                        ocf_log info "${LH} we are already clustered with master - ${OCF_RESKEY_CRM_meta_notify_promote_uname}. We only need to start the app."

                        try_to_start_rmq_app
                        rc2=$?
                        update_rabbit_start_time_if_rc $rc2
                    fi

                else
                    # Note, this should fail when the mnesia is inconsistent.
                    # For example, when the "old" master processing the promition of the new one.
                    # Later this ex-master node will rejoin the cluster at post-start.
                    jjj_join "${OCF_RESKEY_CRM_meta_notify_promote_uname}"
                    rc=$?
                    if [ $rc -eq $OCF_ERR_GENERIC ] ; then
                        ocf_log err "${LH} Failed to join the cluster on post-promote. The resource will be restarted."
                    fi
                fi

                ocf_log info "${LH} post-promote end."
                return $rc
                ;;
            start)
                ocf_log info "${LH} post-start begin."
                local nodes_list="${OCF_RESKEY_CRM_meta_notify_start_uname} ${OCF_RESKEY_CRM_meta_notify_active_uname}"
                # Do nothing, if the list of nodes being started or running reported empty
                # Delegate recovery, if needed, to the "running out of the cluster" monitor's logic
                if [ -z "${nodes_list}" ] ; then
                  ocf_log warn "${LH} I'm a last man standing and I must survive!"
                  ocf_log info "${LH} post-start end."
                  return $OCF_SUCCESS
                fi
                # check did this event from this host
                my_host "${nodes_list}"
                rc=$?
                # Do nothing, if there is no master reported
                # Delegate recovery, if needed, to the "running out of the cluster" monitor's logic
                if [ -z "${OCF_RESKEY_CRM_meta_notify_master_uname}" ] ; then
                  ocf_log warn "${LH} there are no nodes to join to reported on post-start. Nothing to do."
                  ocf_log info "${LH} post-start end."
                  return $OCF_SUCCESS
                fi
                if [ $rc -eq $OCF_SUCCESS ] ; then
                    # Now we need to:
                    # a. join to the cluster if we are not joined yet
                    # b. start the RabbitMQ application, which is always
                    #    stopped after start action finishes
                    check_need_join_to ${OCF_RESKEY_CRM_meta_notify_master_uname}
                    rc_join=$?
                    if [ $rc_join -eq $OCF_SUCCESS ]; then
                      ocf_log warn "${LH} Going to join node ${OCF_RESKEY_CRM_meta_notify_master_uname}"
                      jjj_join "${OCF_RESKEY_CRM_meta_notify_master_uname}"
                      rc2=$?
                    else
                      ocf_log warn "${LH} We are already clustered with node ${OCF_RESKEY_CRM_meta_notify_master_uname}"

                      try_to_start_rmq_app
                      rc2=$?
                      update_rabbit_start_time_if_rc $rc2
                    fi
                    ocf_log info "${LH} post-start end."
                    if [ -s "${OCF_RESKEY_definitions_dump_file}" ] ; then
                        ocf_log info "File ${OCF_RESKEY_definitions_dump_file} exists"
                        ocf_run curl --silent --show-error --request POST --user $OCF_RESKEY_admin_user:$OCF_RESKEY_admin_password $OCF_RESKEY_host_ip:15672/api/definitions --header "Content-Type:application/json" --data @$OCF_RESKEY_definitions_dump_file
                        rc=$?
                        if [ $rc -eq $OCF_SUCCESS ] ; then
                            ocf_log info "RMQ definitions have imported succesfully."
                        else
                            ocf_log err "RMQ definitions have not imported."
                        fi
                    fi
                    if [ $rc2 -eq $OCF_ERR_GENERIC ] ; then
                        ocf_log warn "${LH} Failed to join the cluster on post-start. The resource will be restarted."
                        ocf_log info "${LH} post-start end."
                        return $OCF_ERR_GENERIC
                    fi
                fi
                ;;
            stop)
                # if rabbitmq-server stops on any another node, we should remove it from cluster (as ordinary operation)
                ocf_log info "${LH} post-stop begin."
                # Report not running, if there are no nodes being stopped reported
                if [ -z "${OCF_RESKEY_CRM_meta_notify_stop_uname}" ] ; then
                  ocf_log warn "${LH} there are no nodes being stopped reported on post-stop. The resource will be restarted."
                  ocf_log info "${LH} post-stop end."
                  return $OCF_ERR_GENERIC
                fi
                my_host "${OCF_RESKEY_CRM_meta_notify_stop_uname}"
                rc=$?
                if [ $rc -ne $OCF_SUCCESS ] ; then
                    # Wait for synced state first
                    ocf_log info "${LH} waiting $((OCF_RESKEY_stop_time/2)) to sync"
                    wait_sync $((OCF_RESKEY_stop_time/2))
                    # On other nodes processing the post-stop, make sure the stopped node will be forgotten
                    unjoin_nodes_from_cluster "${OCF_RESKEY_CRM_meta_notify_stop_uname}"
                else
                    # On the nodes being stopped, reset the master score
                    ocf_log info "${LH} resetting the master score."
                    master_score 0
                fi
                # always returns OCF_SUCCESS
                ocf_log info "${LH} post-stop end."
                ;;
            *)  ;;
        esac
    fi

    return $OCF_SUCCESS
}


action_promote() {
    local rc=$OCF_ERR_GENERIC
    local LH="${LL} promote:"

    if [ "${OCF_RESKEY_debug}" = 'true' ] ; then
        d=$(date '+%Y%m%d %H:%M:%S')
        echo $d >> /tmp/rmq-promote.log
        env >> /tmp/rmq-promote.log
        echo "$d  [promote]  start='${OCF_RESKEY_CRM_meta_notify_start_uname}' stop='${OCF_RESKEY_CRM_meta_notify_stop_uname}' active='${OCF_RESKEY_CRM_meta_notify_active_uname}' inactive='${OCF_RESKEY_CRM_meta_notify_inactive_uname}'" >> /tmp/rmq-ocf.log
    fi

    ocf_log info "${LH} action begin."

    get_monitor
    rc=$?
    ocf_log info "${LH} get_monitor returns ${rc}"
    case "$rc" in
        "$OCF_SUCCESS")
            # Running as slave. Normal, expected behavior.
            ocf_log info "${LH} Resource is currently running as Slave"
            # rabbitmqctl start_app if need
            get_status rabbit
            rc=$?
            ocf_log info "${LH} Updating cluster master attribute"
            ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-master' --update 'true'
            if [ $rc -ne $OCF_SUCCESS ] ; then
                ocf_log info "${LH} RMQ app is not started. Starting..."
                start_rmq_server_app
                rc=$?
                if [ $rc -eq 0 ] ; then
                    try_to_start_rmq_app
                    rc=$?
                    if [ $rc -ne 0 ] ; then
                        ocf_log err "${LH} Can't start RMQ app. Master resource is failed."
                        ocf_log info "${LH} action end."
                        exit $OCF_FAILED_MASTER
                    fi

                    [ -f "${OCF_RESKEY_policy_file}" ] && . "${OCF_RESKEY_policy_file}"

                    update_rabbit_start_time_if_rc $rc

                    ocf_log info "${LH} Checking master status"
                    get_monitor
                    rc=$?
                    ocf_log info "${LH} Master status is $rc"
                    if [ $rc = $OCF_RUNNING_MASTER ]
                    then
                       rc=$OCF_SUCCESS
                    else
                       ocf_log err "${LH} Master resource is failed."
                       ocf_log info "${LH} action end."
                       exit $OCF_FAILED_MASTER
                    fi
                else
                    ocf_log err "${LH} Can't start RMQ-runtime."
                    rc=$OCF_ERR_GENERIC
                fi
            fi
            return $rc
            ;;
        "$OCF_RUNNING_MASTER")
            # Already a master. Unexpected, but not a problem.
            ocf_log warn "${LH} Resource is already running as Master"
            rc=$OCF_SUCCESS
            ;;

        "$OCF_FAILED_MASTER")
            # Master failed.
            ocf_log err "${LH} Master resource is failed and not running"
            ocf_log info "${LH} action end."
            exit $OCF_FAILED_MASTER
            ;;

        "$OCF_NOT_RUNNING")
            # Currently not running.
            ocf_log err "${LH} Resource is currently not running"
            rc=$OCF_NOT_RUNNING
            ;;
        *)
            # Failed resource. Let the cluster manager recover.
            ocf_log err "${LH} Unexpected error, cannot promote"
            ocf_log info "${LH} action end."
            exit $rc
            ;;
    esac

    # transform slave RMQ-server to master

    ocf_log info "${LH} action end."
    return $rc
}


action_demote() {
    local LH="${LL} demote:"
    ocf_log info "${LH} action begin."
    ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-master' --delete
    ocf_log info "${LH} action end."
    return $OCF_SUCCESS
}
#######################################################################

rmq_setup_env

case "$1" in
  meta-data)    meta_data
                exit $OCF_SUCCESS;;
  usage|help)   usage
                exit $OCF_SUCCESS;;
esac

# Anything except meta-data and help must pass validation
action_validate || exit $?

# What kind of method was invoked?
case "$1" in
  start)        action_start;;
  stop)         action_stop;;
  status)       action_status;;
  monitor)      action_monitor;;
  validate)     action_validate;;
  promote)      action_promote;;
  demote)       action_demote;;
  notify)       action_notify;;
  validate-all) action_validate;;
  *)            usage;;
esac
###
